package net.bwie.realtime.dy.dwd.log.funcation.zl.giftalertfunction;

import net.bw.realtime.dy.common.utils.DateTimeUtil;
import net.bwie.realtime.dy.dwd.log.bean.AlertGiftSender;

import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @ClassName：GiftAlertProcessFunction
 * @Author: ZddddQ
 * @Date: 2025/5/28 14:13
 * @Description: 必须描述类做什么事情, 实现什么功能
 */
public class GiftAlertProcessFunction
        extends ProcessWindowFunction<Double, AlertGiftSender, String, TimeWindow> {

    private static final double THRESHOLD = 100.0;

    @Override
    public void process(
            String userId,
            Context context,
            Iterable<Double> amounts,
            Collector<AlertGiftSender> out
    ) {
        // 获取聚合结果
        Double totalAmount = amounts.iterator().next();

        // 判断是否超过阈值
        if (totalAmount > THRESHOLD) {
            AlertGiftSender alert = new AlertGiftSender();
            alert.setUserId(userId);
            alert.setWindowStartTime(DateTimeUtil.convertLongToString((context.window().getStart())/1000,"yyyy-MM-dd HH:mm:ss"));
            alert.setWindowEndTime(DateTimeUtil.convertLongToString((context.window().getEnd())/1000,"yyyy-MM-dd HH:mm:ss"));
            alert.setSendMoney(totalAmount);
            alert.setTs(System.currentTimeMillis());
            out.collect(alert);
        }
    }
}
